package internal

import (
	"gitee.com/haozing/wim/pkg/queue2"
	"gitee.com/haozing/wim/pkg/sync2"
	"gitee.com/haozing/wim/session/iface"
	"github.com/golang/glog"
	"sync"
	"time"
)

const (
	keyIdNew     = 0
	keyLoaded    = 1
	unauthorized = 2
	userIdLoaded = 3
	offline      = 4
	closed       = 5
)

type RpcApiMessages struct {
	sessionId int64
	messages  interface{}
}

func (r *RpcApiMessages) SessionId() int64 {
	return r.sessionId
}
func (r *RpcApiMessages) SetSessionId(i int64) {
	r.sessionId = i
}
func (r *RpcApiMessages) RpcMessages() interface{} {
	return r.messages
}

func (r *RpcApiMessages) SetRpcMessages(i interface{}) {
	r.messages = i
}

type AuthUser struct {
	authKeyId     int64  //唯一Id，也可以用userID替代
	authKey       []byte //密钥
	userId        int32  //绑定的用户
	cacheSalt     iface.IFutureSalt
	cacheLastSalt iface.IFutureSalt        //最后的盐
	sessions      map[int64]iface.ISession //session列表
	closeChan     chan struct{}            //关闭信号
	PendingMsg    chan interface{}         // 用户所有端待处理信息[客户端待处理消息，推送端待处理消息]
	rpcDataChan   chan interface{}         // 逻辑处理后的信息，经过再次处理，发往前端网关
	rpcQueue      *queue2.SyncQueue        // 从客户端接收到的需要往api发送的信息
	finish        sync.WaitGroup
	running       sync2.AtomicInt32
	state         int   //状态。有三个，初始化，绑定authKeyId,绑定uid
	onlineExpired int64 //60s在线时间
	pushSessionId int64
	service       iface.IServer
	context       map[string]interface{} //上下文或者附加内容
	//tcp或者websocket地址
}

func (s *AuthUser) Context(key string) interface{} {
	return s.context[key]
}
func (s *AuthUser) SetContext(key string, i interface{}) {
	//todo 会不会有竞争问题？
	s.context[key] = i
}
func (s *AuthUser) CacheSalt() iface.IFutureSalt {
	return s.cacheSalt
}
func (s *AuthUser) SetCacheSalt(f iface.IFutureSalt) {
	s.cacheSalt = f
}
func (s *AuthUser) CacheLastSalt() iface.IFutureSalt {
	return s.cacheLastSalt
}
func (s *AuthUser) SetCacheLastSalt(f iface.IFutureSalt) {
	s.cacheLastSalt = f
}
func (s *AuthUser) AuthKey() []byte {
	return s.authKey
}
func (s *AuthUser) SetAuthKey(authKey []byte) {
	s.authKey = authKey
}
func (s *AuthUser) Server() iface.IServer {
	return s.service
}

func (s *AuthUser) UserId() int32 {
	return s.userId
}

func (s *AuthUser) SetUserId(userId int32) {
	s.userId = userId
	s.onBindUser(userId)
}

func (s *AuthUser) Destroy(sessionId int64) bool {
	if _, ok := s.sessions[sessionId]; ok {
		// s.updates.onGenericSessionClose(sess)
		delete(s.sessions, sessionId)
	} else {
		//
	}
	return true
}

func (s *AuthUser) SendToRpcQueue(rpcMessage iface.IRpcApiMessages) {
	glog.Infof("rpcQueue.Push", rpcMessage)
	s.rpcQueue.Push(rpcMessage)
}

func (s *AuthUser) BindPushSessionId(sessionId int64) {
	if s.pushSessionId == 0 {
		s.pushSessionId = sessionId
	}
}

func (s *AuthUser) SetOffline() {
	for _, sess := range s.sessions {
		if (sess.Type() == iface.KSessionGeneric || sess.Type() == iface.KSessionPush) && sess.Online() {
			return
		}
	}
	err := s.service.Event().SetOffline(s)
	if err != nil {
		return
	}
	s.onlineExpired = 0
}

func MakeAuthUser(authKeyId int64, service iface.IServer) *AuthUser {
	ss := &AuthUser{
		authKeyId:   authKeyId,
		sessions:    make(map[int64]iface.ISession),
		closeChan:   make(chan struct{}),
		PendingMsg:  make(chan interface{}, 1024),
		rpcDataChan: make(chan interface{}, 1024),
		rpcQueue:    queue2.NewSyncQueue(),
		finish:      sync.WaitGroup{},
		state:       keyIdNew,
		service:     service,
		context:     make(map[string]interface{}, 1024),
	}
	return ss
}
func (s *AuthUser) GetOnlineSession(sessionType int) iface.ISession {
	var (
		lastReceiveTime int64 = 0
		lastSession     iface.ISession
	)

	for _, sess := range s.sessions {
		if sess.Type() == sessionType && sess.Online() && sess.LastReceiveTime() >= lastReceiveTime {
			lastSession = sess
			lastReceiveTime = sess.LastReceiveTime()
		}
	}

	return lastSession
}

func (s *AuthUser) AuthKeyId() int64 {
	return s.authKeyId
}

func (s *AuthUser) Start() {
	s.running.Set(1)
	s.finish.Add(1)
	go s.rpcRunLoop()
	go s.runLoop()
}

//启动监听，接收从前端网关(comet)传递过来或要传递过去的消息。
//如果1s内没有消息传递，则执行Timer()，影响在线状态。
func (s *AuthUser) runLoop() {
	defer func() {
		s.finish.Done()
		close(s.closeChan)
		s.finish.Wait()
	}()

	for s.running.Get() == 1 {
		select {
		case <-s.closeChan:
			// log.Info("runLoop -> To Close ", this.String())
			return
		case sessionMsg, _ := <-s.PendingMsg:
			s.Session(sessionMsg)
		case rpcMessages, _ := <-s.rpcDataChan:
			results, _ := rpcMessages.(iface.IRpcApiMessages)
			s.OnRpcResult(results)
		case <-time.After(time.Second):
			s.Timer()
		}
	}
}

//监听rpcQueue中的消息
//rpcQueue存放通过Event自定义处理的消息，将消息送往逻辑处理服务(一般是RPC外部服务)。
func (s *AuthUser) rpcRunLoop() {
	for {
		apiRequests := s.rpcQueue.Pop()
		if apiRequests == nil {
			glog.Info("quit rpcRunLoop...")
			return
		} else {
			requests, _ := apiRequests.(iface.IRpcApiMessages)
			s.OnRpcRequest(requests)
		}
	}
}

// Timer 1秒内同前端网关无消息传递交互(包括收、发)，则执行。
func (s *AuthUser) Timer() {
	for _, sess := range s.sessions {
		sess.InSession().Timer()
	}

	//只有有一个在线的，就不用关闭
	for _, sess := range s.sessions {
		if !sess.Closed() {
			return
		}
	}

	s.service.CloseAuthUser(s.authKeyId)
}

// Session 根据sessionId创建或者获取session和自定义的InSession，并将自定义InSession放入AuthUser的sessions中。
//需要说明的是，一个用户可能会有多个session。
//最后，将执行InSession中的Dispatch()方法。
//Dispatch()执行后，按照流程，应该将处理后的数据，放入AuthUser中rpcQueue里，确保消息往下流转。
func (s *AuthUser) Session(msg interface{}) {

	var sess iface.ISession
	var inSess iface.InSession
	//初始化session
	inSess, sess = s.service.Event().InitSession(s, msg)
	if inSess != nil && inSess.SessionId() != 0 && inSess.SessionType() != 0 {
		if s.sessions[inSess.SessionId()] == nil {
			if sess == nil {
				sess = newSession(inSess.SessionId(), inSess.SessionType(), s, inSess)
			}
			s.sessions[inSess.SessionId()] = sess
		}
	}
	//解码
	sessionId, decodeMsg := s.service.Event().MsgDecodeHandler(s, msg)
	if sessionId == 0 || decodeMsg == nil {
		return
	}
	sess = s.sessions[sessionId]
	if sess == nil {
		glog.Error("no session found")
		return
	}
	inSess = s.sessions[sessionId].InSession()
	var ApiMessages = RpcApiMessages{}
	ApiMessages.SetSessionId(inSess.SessionId())
	ApiMessages.SetRpcMessages(decodeMsg)
	dispatch, err := inSess.Dispatch(sess, &ApiMessages)
	if err != nil {
		glog.Errorf("dispatch is err,", err)
		return
	}
	if dispatch != nil {
		s.SendToRpcQueue(dispatch)
	}
}

// MsgJoin 将信息放入PendingMsg通道
func (s *AuthUser) MsgJoin(buf interface{}) error {
	select {
	case s.PendingMsg <- buf:
		return nil
	}
}
func (s *AuthUser) Stop() {
	s.running.Set(0)
	s.rpcQueue.Close()
}
func (s *AuthUser) onBindUser(userId int32) {
	s.state = userIdLoaded
	s.userId = userId
}
func (s *AuthUser) SetOnline() {
	date := time.Now().Unix()
	if (s.onlineExpired == 0 || date > s.onlineExpired-iface.KPingAddTimeout) && s.userId != 0 {
		// log.Info("DEBUG] setOnline - set online ", s.onlineExpired)
		err := s.service.Event().SetOnline(s)
		if err != nil {
			return
		}
		s.onlineExpired = time.Now().Unix() + 60
	} else {
		// log.Info("DEBUG] setOnline - not set online ", s.onlineExpired)
	}
}

// OnRpcResult 消息流转的最后一步，下一步将消息发送给前端网关(comet)
func (s *AuthUser) OnRpcResult(rpcResults iface.IRpcApiMessages) {
	if sess, ok := s.sessions[rpcResults.SessionId()]; ok {
		sess.InSession().OnRpcResult(sess, rpcResults)
	} else {
		glog.Warning("onRpcResult - not found rpcSession by sessionId: ", rpcResults.SessionId())
	}
}

// OnRpcRequest 需要调用逻辑服务处理的东西，比如：注册
func (s *AuthUser) OnRpcRequest(requests iface.IRpcApiMessages) {

	var rpcMessageList interface{}
	if sess, ok := s.sessions[requests.SessionId()]; ok {
		rpcMessageList = sess.InSession().OnRpcRequest(sess, requests)
	} else {
		glog.Warning("onRpcResult - not found rpcSession by sessionId: ", requests.SessionId())
		return
	}
	requests.SetRpcMessages(rpcMessageList)
	s.rpcDataChan <- requests
}

func (s *AuthUser) GetSessionByConnId(connId int64) iface.ISession {
	for _, sess2 := range s.sessions {
		if sess2.IdExist(connId) {
			return sess2
		}
	}
	return nil
}
